Amplify Gen2 で DynamoDB Streams を使って DynamoDB テーブル間のデータ連携を行う
いわさです。
Amplify Gen2 を使って、DynamoDB でスコアを管理するアプリケーションを作成しています。
このアプリケーションでは、スコアが不定期に加算・減算されるイベントが発生します。
これをスコアアクティビティイベントと呼んでおり、外部から AppSync 経由でトリガーされています。構成図にするとこんな感じでしょうか。
スコアアクティビティイベントを受信するだけのアプリ
Amplify の Data コンポーネントでスコアアクティビティテーブルを定義するだけなので、以下のみで実現出来ます。
import { type ClientSchema, a, defineData } from '@aws-amplify/backend';
const schema = a.schema({
ScoreActivities: a
.model({
team_id: a.string().required(),
score_change: a.integer().required(),
})
.authorization((allow) => [
allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"),
allow.publicApiKey()
])
});
:
細切れにチームごとのスコアアクティビティイベントが発生するのですが、チームごとの合計スコアを管理したくなりました。スコアアクティビティテーブルをアプリケーションで集計することも出来るのですが、イベント購読やランキングなどを考えると「合計スコア」テーブルで管理出来ると良いのではと考えました。
そこで、今回は次のように DynamoDB Streams を経由して、スコアアクティビティイベント発生時に Lambda 関数経由で合計スコアテーブルを更新仕組みを実装してみたいと思います。
構成図で表現すると次のピンク色の枠の中を実装するイメージです。
TotalScore テーブルの実装
まずは合計スコアを管理するテーブル「TotalScore」を Data コンポーネント経由で定義します。
これは Amplify Gen2 でスキーマ定義を追加するだけなので簡単に実装が出来ます。
import { type ClientSchema, a, defineData } from '@aws-amplify/backend';
const schema = a.schema({
ScoreActivities: a
.model({
team_id: a.string().required(),
score_change: a.integer().required(),
})
.authorization((allow) => [
allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"),
allow.publicApiKey()
]),
TotalScore: a
.model({
team_id: a.string().required(),
total_score: a.integer().required(),
})
.identifier(['team_id'])
.authorization((allow) => [
allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"),
allow.publicApiKey()
]),
});
:
デプロイ後、Amplify コンソールのデータマネージャーから新しいテーブルが追加されていることを確認出来ました。
DynamoDB Streams から Lambda 関数をトリガー
続いて、ScoreActivities テーブルにアイテムが追加されたタイミングで TotalScore テーブルの合計スコア値を更新したいと思いますので、DynamoDB Streams 経由で Lambda 関数をトリガーします。
Amplify Gen2 では DyanamoDB Streams がデフォルトで有効になっており、Lambda 関数を定義してイベントマッピングを作成してやれば動作します。
Lambda 関数は先程定義した TotalScore へリクエストを送信する必要があるので、Amplify が生成する GraphQL エンドポイント URL と API キーを、Lambda 関数に引き渡します。
先日記事に書いたのですが、このパターンは循環依存が発生するケースになりますので、次の記事を参考に Lambda 関数は Amplify 管理外のカスタムリソースとして定義しました。
最終的な実装は次のような感じに。別途記事にしたいと思いますが StartingPosition の仕様については少し注意したほうが良いです。データ損失が起きる場合があるので。
import { defineBackend } from '@aws-amplify/backend';
import { auth } from './auth/resource';
import { data } from './data/resource';
import { preTokenGenerationV2 } from './auth/pre-token-generation-v2/resource';
import { Effect, PolicyStatement } from 'aws-cdk-lib/aws-iam';
import { EventSourceMapping, StartingPosition } from 'aws-cdk-lib/aws-lambda';
import { NodejsFunction} from 'aws-cdk-lib/aws-lambda-nodejs';
import * as url from 'node:url';
const backend = defineBackend({
auth,
data,
preTokenGenerationV2,
});
const { cfnUserPool } = backend.auth.resources.cfnResources
cfnUserPool.addPropertyOverride("LambdaConfig.PreTokenGenerationConfig",{
LambdaVersion: 'V2_0',
LambdaArn: backend.preTokenGenerationV2.resources.lambda.functionArn,
});
const updateTotalScoreStack = backend.createStack('UpdateTotalScoreStack');
const funcitonUpdateTotalScoreOnActivity = new NodejsFunction(
updateTotalScoreStack,
'update-totalscore-on-activity',
{
entry: url.fileURLToPath(new URL('./functions/update-totalscore-on-activity/handler.ts', import.meta.url)),
environment: {
APPSYNC_ENDPOINT: backend.data.resources.cfnResources.cfnGraphqlApi.attrGraphQlUrl,
APPSYNC_API_KEY: backend.data.resources.cfnResources.cfnApiKey?.attrApiKey || '',
},
initialPolicy: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:ListStreams",
],
resources: ["*"],
}),
],
}
)
new EventSourceMapping(
updateTotalScoreStack,
"TotalScoreFunctionEventSourceMapping",
{
eventSourceArn: backend.data.resources.tables["ScoreActivities"].tableStreamArn,
target: funcitonUpdateTotalScoreOnActivity,
batchSize: 100,
startingPosition: StartingPosition.LATEST,
}
);
Lambda 関数から AppSync 経由で TotalScore を更新
最後に DynamoDB Streams からトリガーされた Lambda 関数で TotalScore テーブルに書き込みを行えば完了です。
今回は、次の記事を参考に Lambda 関数で Axios を使ってリクエストを送信します。記事ではデフォルトプロバイダーで SigV4 署名していますが、私のほうは諸事情により API キーを使いました。
イベントで受信したスコアを TotalScore に加算します。該当キーのアイテムが存在しない場合は作成も行います。
コードは生成 AI に手伝ってもらいながら手直しを入れたものです。便利な世の中になったものだ。
import axios, { AxiosError } from "axios";
import type { DynamoDBStreamHandler } from "aws-lambda";
import { Logger } from "@aws-lambda-powertools/logger";
type TotalScoreInput = {
team_id: string;
total_score: number;
};
type GraphQLError = {
message: string;
path: string[];
errorType: string;
};
const logger = new Logger({
logLevel: "INFO",
serviceName: "dynamodb-stream-handler",
});
// GraphQL操作の定義
const CREATE_TOTAL_SCORE = `
mutation CreateTotalScore($input: CreateTotalScoreInput!) {
createTotalScore(input: $input) {
team_id
total_score
}
}
`;
const UPDATE_TOTAL_SCORE = `
mutation UpdateTotalScore($input: UpdateTotalScoreInput!) {
updateTotalScore(input: $input) {
team_id
total_score
}
}
`;
const GET_TOTAL_SCORE = `
query GetTotalScore($team_id: String!) {
getTotalScore(team_id: $team_id) {
team_id
total_score
}
}
`;
async function executeGraphQLRequest(query: string, variables: any) {
if (!process.env.APPSYNC_ENDPOINT || !process.env.APPSYNC_API_KEY) {
throw new Error("Missing required environment variables: APPSYNC_ENDPOINT or APPSYNC_API_KEY");
}
const response = await axios.post(
process.env.APPSYNC_ENDPOINT,
{
query,
variables,
},
{
headers: {
'Content-Type': 'application/json',
'x-api-key': process.env.APPSYNC_API_KEY,
},
}
);
if (response.data.errors) {
throw new Error(`GraphQL Error: ${JSON.stringify(response.data.errors)}`);
}
return response.data;
}
async function updateOrCreateTotalScore(teamId: string, newTotalScore: number) {
try {
const updateResponse = await executeGraphQLRequest(
UPDATE_TOTAL_SCORE,
{
input: {
team_id: teamId,
total_score: newTotalScore
}
}
);
logger.info('Successfully updated total score', { teamId, newTotalScore });
return updateResponse;
} catch (error) {
if (error instanceof Error) {
if (error.message.includes('ConditionalCheckFailedException')) {
logger.info('Record not found, creating new total score', { teamId, newTotalScore });
const createResponse = await executeGraphQLRequest(
CREATE_TOTAL_SCORE,
{
input: {
team_id: teamId,
total_score: newTotalScore
}
}
);
return createResponse;
}
}
throw error;
}
}
export const handler: DynamoDBStreamHandler = async (event) => {
const batchItemFailures: Array<{itemIdentifier: string}> = [];
logger.info('Starting to process DynamoDB Stream records', {
recordCount: event.Records.length
});
for (const record of event.Records) {
try {
if (record.eventName !== 'INSERT') {
logger.info(`Skipping non-INSERT event: ${record.eventName}`);
continue;
}
const newImage = record.dynamodb?.NewImage;
if (!newImage) {
logger.warn('No NewImage in record, skipping');
continue;
}
const teamId = newImage.team_id?.S;
const scoreChange = parseInt(newImage.score_change?.N ?? '0');
if (!teamId || scoreChange === 0) {
logger.warn('Invalid record data', { teamId, scoreChange });
continue;
}
const getTotalScoreResponse = await executeGraphQLRequest(
GET_TOTAL_SCORE,
{ team_id: teamId }
);
const currentScore = getTotalScoreResponse.data.getTotalScore?.total_score ?? 0;
const newTotalScore = currentScore + scoreChange;
const result = await updateOrCreateTotalScore(teamId, newTotalScore);
logger.info('Successfully processed score change', {
teamId,
scoreChange,
currentScore,
newTotalScore,
operation: result.data.createTotalScore ? 'create' : 'update'
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
const errorStack = error instanceof Error ? error.stack : undefined;
logger.error('Error processing record', {
error: errorMessage,
stack: errorStack,
recordId: record.eventID
});
if (record.eventID) {
batchItemFailures.push({
itemIdentifier: record.eventID
});
}
}
}
return { batchItemFailures };
};
実行結果
ではデプロイ後に試してみます。
まずは ScoreActivities テーブルで、team1 に対して 1000 を加算します。
TotalScore を確認してみると、team1 の合計スコアが 1000 になっていることを確認出来ました。新規作成の部分うまく動いていそうです。
続いて同じチームに対して、さらに 200 加算します。
1200 に加算されました。更新も出来てますね。
続いて違うチームのスコアを発生させてみました。
既存のチームには影響なく新しいチームのアイテムが作成されました。
この時点では team1 のほうがスコアが高いです。
ここで team1 に対して減算イベントを発生させます。-300 です。
減算も出来てますね。team2 のほうが合計スコアが大きくなりました。
さいごに
本日は Amplify Gen2 で DynamoDB Streams を使って DynamoDB テーブル間のデータ連携を行う方法として、 AppSync スキーマ間の連携に DynamoDB Streams 経由を使ってみました。まぁ悪くないのではと思いますがどうでしょう。
Amplify + TypeScript 力があまり高くないもので、生成 AI ベースの Lambda 関数となっているので、見る人が見ると「なんだこのコードは」となるのかもしれません。そのあたりは大目に見てください。